use crate::source::{Source, SourceResult, SourceFormat};
use crate::conf_init::{MapResult};
use tokio::fs::File;
use tokio::io::{BufReader, AsyncBufReadExt, Lines};
use crate::data_frame::json::JsonValue;
use async_trait::async_trait;
use crate::indicators::{indicators_key_ref, Op, IndicatorsMode};
use tokio::runtime::Runtime;
use std::collections::VecDeque;
use std::time::Duration;
use crate::data_frame::single_data::SingleData;

#[derive(Debug)]
pub struct SourceFile{
    name: String,
    file_name: String,
    format: SourceFormat,
    lines: Option<Lines<BufReader<File>>>,
}
#[derive(Deserialize,Debug)]
pub struct SourceFileConf{
    file_name: String,
}
impl SourceFile{
    pub fn new(name: &str, format: &SourceFormat, cnf: &SourceFileConf)->MapResult<Box<dyn Source>>{
        let source_file = SourceFile{
            name: name.to_string(),
            file_name: cnf.file_name.clone(),
            format: format.clone(),
            lines: None,
        };

        return MapResult::Ok(Box::new(source_file));
    }
    pub fn json_deserialization(name: String, data: Vec<String>)->Vec<SingleData> {
        let mut tmp = Vec::with_capacity(data.len());
        for d in data.into_iter(){
            let json = JsonValue::build_new(d);
            if let Ok(json) = json {
                let tmp_data = SingleData::new(json);
                tmp.push(tmp_data);
            }else{
                indicators_key_ref(Op::Add, IndicatorsMode::Source, name.as_str(), "json format err", 1);
            }
        }

        return tmp;
    }
}

#[async_trait]
impl Source for SourceFile{
    async fn init(&mut self)->bool{
        let fs = File::open(&self.file_name).await;
        match fs{
            Ok(t) => {
                let bf = BufReader::with_capacity(1_000_000_000, t);
                let lines = bf.lines();
                self.lines = Some(lines);
                return true;
            },
            Err(_) => {
                return false;
            },
        }
    }
    async fn recv(&mut self, runtime: &Runtime, timeout: Duration, burst: usize, concurrent: usize)->SourceResult {
        if let Some(ref mut lines) = self.lines {

            let mut join_set = VecDeque::new();
            let mut file_end = false;
            let mut file_err = false;

            for i in 0..concurrent{
                let mut data = Vec::with_capacity(burst);

                for j in 0..burst{
                    if let Ok(d) = lines.next_line().await{
                        if let Some(d) = d{
                            data.push(d);
                        }else{
                            //file end
                            file_end = true;
                            break;
                        }
                    }else{
                        //file read err
                        file_err = true;
                        break;
                    }
                }

                if data.len() > 0{
                    let name = self.name.clone();
                    let t = runtime.spawn(async {
                        SourceFile::json_deserialization(name, data)
                    });
                    join_set.push_back(t);
                }

                if file_end || file_err{
                    break;
                }
            }

            let mut tmp = Vec::with_capacity(concurrent*burst);

            while let Some(t) = join_set.pop_front(){
                if let Ok(t) = t.await {
                    tmp.extend_from_slice(t.as_slice());
                }else{
                    indicators_key_ref(Op::Add, IndicatorsMode::Source, self.name.as_str(), "concurrent err", 1);
                }
            }
            if tmp.len() > 0{
                return SourceResult::Ok(tmp);
            }else{
                if file_err{
                    return SourceResult::Err(());
                }else if file_end{
                    return SourceResult::Ok(tmp);
                }else{
                    return SourceResult::Ok(tmp);
                }
            }
        } else {
            return SourceResult::Err(());
        }
    }
}
